Skip to content

[SPARK-56661] Implementing UDFDispatcherManager for new UDF worker sessions#55712

Open
sven-weber-db wants to merge 1 commit intoapache:masterfrom
sven-weber-db:sven-weber_data/SPARK-56661-udf-changes
Open

[SPARK-56661] Implementing UDFDispatcherManager for new UDF worker sessions#55712
sven-weber-db wants to merge 1 commit intoapache:masterfrom
sven-weber-db:sven-weber_data/SPARK-56661-udf-changes

Conversation

@sven-weber-db
Copy link
Copy Markdown
Contributor

@sven-weber-db sven-weber-db commented May 6, 2026

What changes were proposed in this pull request?

This PR implements a UDFDispatcherManager class in the new /udf package that was initiated by SPIP SPARK-55278. The purpose of the new Manager class is to provide a single entry-point for Spark with which a UDF session to a external UDF worker can be created, based on a WorkerSpecification instance. This manager and entry-point will be used by follow-up PRs to implement new, language agnostic Catalyst nodes.

Why are the changes needed?

The UDFWorkerManager serves two main purposes:

  1. Provide a single, unified entry-point to Spark for UDF worker/session creation
  2. Implement the management of UDF WorkerDispachter classes - depending on the UDFWorkerSpecification they are created for. This is required because the newly proposed UDF framework from SPIP SPARK-55278, enables clients to specify different UDF dispatchers for their UDFs. This implies:

2.1. Multiple, different dispatchers can exist at the same time
-> The right one needs to be selected to create a UDF session
2.2. Dispatcher lifetime needs to be managed
-> Dispatchers and their resources need to be cleaned-up if they are no longer needed by clients

Does this PR introduce any user-facing change?

No - All changes are marked as Experimental and not yet consumed.

How was this patch tested?

New unit-tests where added for the changes in the UDFDispatcherManager and WorkerSession

Was this patch authored or co-authored using generative AI tooling?

Partially. However, the code was manually reviewed and adjusted.

@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661-udf-changes branch 2 times, most recently from 83e1033 to 184fc44 Compare May 6, 2026 14:25
@sven-weber-db sven-weber-db changed the title [SPARK-56324] Implementing UDFWorkerManager for new UDF worker sessions [SPARK-56661] Implementing UDFWorkerManager for new UDF worker sessions May 6, 2026
@sven-weber-db sven-weber-db marked this pull request as ready for review May 6, 2026 14:43

// Must be called while holding `lock`.
private def handleSessionTermination(
workerSpec: UDFWorkerSpecification
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we shall also pass the session object here?

Copy link
Copy Markdown
Contributor Author

@sven-weber-db sven-weber-db May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean move the activeSessions.remove(session) call into this function like so?

private def handleSessionTermination(
     session: WorkerSession,
     workerSpec: UDFWorkerSpecification
  ): Unit = {
    activeSessions.remove(session)
    
    val entry = dispatchers.get(workerSpec)
    // Note: entry == null is unexpected and should
    // throw here.
    entry.activeSessionCount -= 1
    if (entry.activeSessionCount == 0) {
      logger.info("All sessions closed for dispatcher " +
        s"${entry.dispatcher.dispatcherId}, removing from cache")
      dispatchers.remove(workerSpec)
      onAllDispatcherSessionsClosed(entry.dispatcher)
    }
  }

@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661-udf-changes branch 2 times, most recently from c1e56ab to 7c77e8d Compare May 7, 2026 08:54
@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661-udf-changes branch from 7c77e8d to 1f11959 Compare May 7, 2026 11:57
@sven-weber-db sven-weber-db changed the title [SPARK-56661] Implementing UDFWorkerManager for new UDF worker sessions [SPARK-56661] Implementing UDFDispatcherManager for new UDF worker sessions May 7, 2026
* Not called during [[UDFDispatcherManager#stop]] -- the manager
* cleans up dispatchers it holds directly in that case.
*/
def onAllDispatcherSessionsClosed(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be better to include this per dispatcher, rather in this factory.

Copy link
Copy Markdown
Contributor Author

@sven-weber-db sven-weber-db May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my view the point of this function is to decide how long a dispatcher is being kept around. E.g. in a scenario where clients provide different worker specs, we cannot keep every dispatcher for the whole Spark runtime. There needs to be some sort of timeout, especially once we implement warm-pooling.

Should this dispatcher timeout be controlled by each dispatcher instead of a central place? The re-use logic may also depend on how many dispatchers there currently are. E.g. if we only need one dispatcher in the whole Spark lifetime there is no need for eager cleanup. Therefore, I would argue its more suitable to decide this in the factory, which has knowledge of the number and kind of dispatchers that are currently in existence

* clean up any dispatchers/resources they hold beyond what the
* [[UDFDispatcherManager]] manages.
*/
def onStop(): Unit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as this - factory pattern means it only defines the object creation logic, onStop should be added to dispatcher instance.

private val lock = new Object
private val dispatchers =
new HashMap[UDFWorkerSpecification, DispatcherEntry]()
private val activeSessions = new ArrayList[WorkerSession]()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These logic looks like more dispatcher impl. details - e.g. a proper implemented dispatcher should manage active sessions properly.

A dispatcher spawns workers and creates sessions, then it's more natural a dispatcher manages the sessions it starts.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, the dispatcher manager here creates dispatcher, then it takes care of disposal of dispatchers.

A dispatcher spawns workers and sessions, then the disposal of session and workers should live in a dispatcher.

Because creation and disposal logic can be related, it's better to colocate them together.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants